package org.apache.rocketmq.proxy.grpc.v2;

import apache.rocketmq.v2.*;
import io.grpc.stub.StreamObserver;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.client.ClientActivity;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.grpc.v2.producer.SendMessageActivity;
import org.apache.rocketmq.proxy.grpc.v2.route.RouteActivity;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;

import java.util.concurrent.CompletableFuture;



/**
 * @课程描述:从零带你写框架系列中的课程，整个系列包含netty，xxl-job，rocketmq，nacos，sofajraft，spring，springboot，disruptor，编译器，虚拟机等等。
 * @author：陈清风扬，个人微信号：chenqingfengyangjj。
 * @date:2025/1/3
 * @方法描述：Grpc服务器默认使用的消息处理器
 */
public class DefaultGrpcMessingActivity extends AbstractStartAndShutdown implements GrpcMessingActivity {


    private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);

    //客户端配置管理器
    protected GrpcClientSettingsManager grpcClientSettingsManager;

    //Grpc通道管理器
    protected GrpcChannelManager grpcChannelManager;

    //处理根据主题获取路由信息请求的活动处理器
    protected RouteActivity routeActivity;

    //处理客户端操作的活动器
    protected ClientActivity clientActivity;

    //消息处理器
    protected SendMessageActivity sendMessageActivity;

    //构造方法
    protected DefaultGrpcMessingActivity(MessagingProcessor messagingProcessor) {
        this.init(messagingProcessor);
    }


    /**
     * @课程描述:从零带你写框架系列中的课程，整个系列包含netty，xxl-job，rocketmq，nacos，sofajraft，spring，springboot，disruptor，编译器，虚拟机等等。
     * @author：陈清风扬，个人微信号：chenqingfengyangjj。
     * @date:2025/1/3
     * @方法描述：初始化默认消息处理器的方法
     */
    protected void init(MessagingProcessor messagingProcessor) {
        this.grpcClientSettingsManager = new GrpcClientSettingsManager(messagingProcessor);
        this.grpcChannelManager = new GrpcChannelManager(messagingProcessor.getProxyRelayService(),this.grpcClientSettingsManager);
        //创建主题路由消息处理器，这里可以看到，这个主题路由消息处理器持有了messagingProcessor消息处理器
        //这又一次证明了，最终处理请求的是最开始创建的这个messagingProcessor消息处理器
        this.sendMessageActivity = new SendMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager);
        this.routeActivity = new RouteActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager);
        this.clientActivity = new ClientActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager);
        this.appendStartAndShutdown(this.grpcClientSettingsManager);
    }


    /**
     * @课程描述:从零带你写框架系列中的课程，整个系列包含netty，xxl-job，rocketmq，nacos，sofajraft，spring，springboot，disruptor，编译器，虚拟机等等。
     * @author：陈清风扬，个人微信号：chenqingfengyangjj。
     * @date:2025/1/3
     * @方法描述：根据主题查询路由信息的方法
     */
    @Override
    public CompletableFuture<QueryRouteResponse> queryRoute(ProxyContext ctx, QueryRouteRequest request) {
        return this.routeActivity.queryRoute(ctx, request);
    }



    @Override
    public CompletableFuture<HeartbeatResponse> heartbeat(ProxyContext ctx, HeartbeatRequest request) {
        return this.clientActivity.heartbeat(ctx, request);
    }

    @Override
    public ContextStreamObserver<TelemetryCommand> telemetry(StreamObserver<TelemetryCommand> responseObserver) {
        return this.clientActivity.telemetry(responseObserver);
    }


    //其他方法暂时不做实现

    @Override
    public CompletableFuture<SendMessageResponse> sendMessage(ProxyContext ctx, SendMessageRequest request) {
        return this.sendMessageActivity.sendMessage(ctx, request);
    }

    @Override
    public CompletableFuture<QueryAssignmentResponse> queryAssignment(ProxyContext ctx, QueryAssignmentRequest request) {
        return null;
    }

    @Override
    public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request, StreamObserver<ReceiveMessageResponse> responseObserver) {

    }

    @Override
    public CompletableFuture<AckMessageResponse> ackMessage(ProxyContext ctx, AckMessageRequest request) {
        return null;
    }

    @Override
    public CompletableFuture<ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue(ProxyContext ctx, ForwardMessageToDeadLetterQueueRequest request) {
        return null;
    }

    @Override
    public CompletableFuture<EndTransactionResponse> endTransaction(ProxyContext ctx, EndTransactionRequest request) {
        return null;
    }

    @Override
    public CompletableFuture<NotifyClientTerminationResponse> notifyClientTermination(ProxyContext ctx, NotifyClientTerminationRequest request) {
        return null;
    }

    @Override
    public CompletableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuration(ProxyContext ctx, ChangeInvisibleDurationRequest request) {
        return null;
    }

}
